package mqtt.config;

import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import mqtt.annotation.Topic;
import mqtt.receiver.MqttReceiverMessageHandler;
import mqtt.utils.ClassPathScanningProvider;
import mqtt.utils.StringUtils;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

import java.util.HashSet;
import java.util.Set;

/**
 * @Author: Lamb
 * @Date: 2022/9/23 15:56
 */
@ComponentScan(basePackages = {"mqtt"})
@Slf4j
@Configuration
@IntegrationComponentScan
@AllArgsConstructor
public class MqttConfig {

    private final MqttProperties mqttProperties;

    private final ReceiverProperties receiverProperties;

    private final SenderProperties senderProperties;

    //-----------------------------------客户端工厂配置-----------------------------------//

    /**
     * MQTT连接器选择项配置
     *
     * @return
     */
    @Bean
    public MqttConnectOptions getMqttConnectOptions() {
        log.info("--- -> " + mqttProperties);
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        mqttConnectOptions.setCleanSession(mqttProperties.isCleanSession());
        mqttConnectOptions.setConnectionTimeout(mqttProperties.getConnectionTimeout());
        mqttConnectOptions.setUserName(mqttProperties.getUsername());
        mqttConnectOptions.setPassword(mqttProperties.getPassword().toCharArray());
        mqttConnectOptions.setServerURIs(new String[]{mqttProperties.getServerUri()});
        mqttConnectOptions.setKeepAliveInterval(mqttProperties.getKeepAliveInterval());
        mqttConnectOptions.setAutomaticReconnect(mqttProperties.isAutomaticReconnect());
        return mqttConnectOptions;
    }

    /**
     * MQTT客户端工厂
     *
     * @return
     */
    @Bean
    public MqttPahoClientFactory mqttPahoClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(getMqttConnectOptions());
        return factory;
    }

    //-----------------------------------MQTT接收者配置-----------------------------------//

    /**
     * 接收者信息通道
     *
     * @return
     */
    @Bean
    public MessageChannel receiverMessageChannel() {
        return new DirectChannel();
    }

    /**
     * 接收者消息生成
     *
     * @return
     */
    @Bean
    public MessageProducer receiverMessageProducer() throws Exception {
        ReceiverProperties receiverProp = receiverProperties;

        // 获取tech.zhongkai.help.iot包路径下的Class
        ClassPathScanningProvider classPathScanningProvider = new ClassPathScanningProvider();
        classPathScanningProvider.addCandidateFilter((String className, Class<?> c) -> {
            return c.isAnnotationPresent(Topic.class);
        });
        Set<Class> classSet = classPathScanningProvider.findClassSet(mqttProperties.getTopicBasePackage());
        Set<String> topicSet = new HashSet<>();
        if (!classSet.isEmpty()) {
            for (Class c : classSet) {
                Topic topic = (Topic) c.getAnnotation(Topic.class);
                topicSet.add(topic.name());
            }
        }
        String[] topics = new String[topicSet.size()];
        topicSet.toArray(topics);
        log.info("MQTT初始化订阅消息列表: {}", topicSet);

        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(receiverProp.getClientId() + "_" + StringUtils.uuid(), mqttPahoClientFactory(), topics);
        adapter.setCompletionTimeout(mqttProperties.getCompletionTimeout());
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(receiverProp.getQos());
        adapter.setOutputChannel(receiverMessageChannel());
        return adapter;
    }

    /**
     * 接收者消息处理器
     *
     * @return
     */
    @Bean
    @ServiceActivator(inputChannel = "receiverMessageChannel")
    public MessageHandler receiverMessageHandler() {
        return mqttReceiverMessageHandler();
    }

    @Bean
    public MqttReceiverMessageHandler mqttReceiverMessageHandler() {
        return new MqttReceiverMessageHandler();
    }


    //-----------------------------------MQTT发送者配置-----------------------------------//

    /**
     * 发送者消息通道
     *
     * @return
     */
    @Bean
    public MessageChannel senderMessageChannel() {
        return new DirectChannel();
    }

    /**
     * 发送者消息处理器
     *
     * @return
     */
    @Bean
    @ServiceActivator(inputChannel = "senderMessageChannel")
    public MessageHandler senderMessageHandler() {
        SenderProperties senderProp = senderProperties;
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(senderProp.getClientId() + "_" + StringUtils.uuid(), mqttPahoClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic(senderProp.getTopic());
        return messageHandler;
    }
}
